package com.xcc.coprocessor;

import com.xcc.constant.CommonConstant;
import com.xcc.constant.Names;
import com.xcc.dao.BaseDao;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;

public class MyObserver extends BaseRegionObserver {

    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        super.postPut(e, put, edit, durability);
        //1、获取需要操作的表
        String targetTableName = Names.TABLE.getValue();
        //2、获取当前操作的表
        String currentTableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
        //3、判断需要操作的表是否就是当前表，如果不是，则return
        if (!StringUtils.equals(targetTableName, currentTableName)) return;

        //4、得到当前插入数据的值并封装新的数据，oriRowkey举例：01_15369468720_20170727081033_13720860202_0180_1
        String oriRowKey = Bytes.toString(put.getRow());
        String[] splits = oriRowKey.split("_");
        String flag = splits[5];
        //如果当前插入的是被叫数据，则直接返回(因为默认提供的数据全部为主叫数据)
        if(StringUtils.equals(flag, CommonConstant.FLAG_FLASE.toString())) return;
        //当前插入的数据描述
        String call1 = splits[1];
        String call2 = splits[3];
        String calltime = splits[2];
        String duration = splits[4];
        //组装新的数据所在分区号
        MyDao dao = new MyDao();
        String newFlag = CommonConstant.FLAG_FLASE +"";
        String rowKey = dao.getRowkeyRegion(call2, calltime, CommonConstant.REGION_COUNT) + "_" + call2 + "_" + calltime + "_" + call1 + "_" + duration+"_"+CommonConstant.FLAG_FLASE;

        //开始存放被叫数据
        Put newPut = new Put(Bytes.toBytes(rowKey));
        newPut.addColumn(Bytes.toBytes(Names.CALLEE.getValue()), Bytes.toBytes("call1"), Bytes.toBytes(call2));
        newPut.addColumn(Bytes.toBytes(Names.CALLEE.getValue()), Bytes.toBytes("call2"), Bytes.toBytes(call1));
        newPut.addColumn(Bytes.toBytes(Names.CALLEE.getValue()), Bytes.toBytes("calltime"), Bytes.toBytes(calltime));
        newPut.addColumn(Bytes.toBytes(Names.CALLEE.getValue()), Bytes.toBytes("duration"), Bytes.toBytes(duration));
        newPut.addColumn(Bytes.toBytes(Names.CALLEE.getValue()), Bytes.toBytes("flag"), Bytes.toBytes(newFlag));

        Table hTable = e.getEnvironment().getTable(TableName.valueOf(targetTableName));
        hTable.put(newPut);
        hTable.close();
    }

    /*public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        Table table = e.getEnvironment().getTable(TableName.valueOf(Names.TABLE.getValue()));
        String row = Bytes.toString(put.getRow());
        //3_19683537146_20190927133319_14171709460_0111_1
        String[] arr = row.split("_");
        String call1 = arr[1];
        String calltime = arr[2];
        String call2 = arr[3];
        String duration = arr[4];
        String flag = arr[5];

        if (CommonConstant.FLAG_TURE.equals(Integer.valueOf(flag))) {
            //生成被叫数据
            MyDao dao = new MyDao();
            String newRow = dao.getRowkeyRegion(call2, calltime, CommonConstant.REGION_COUNT) + "_" + call2 + "_" + calltime + "_" + call1 + "_" + duration+"_"+CommonConstant.FLAG_FLASE;
            Put newPut = new Put(Bytes.toBytes(newRow));
            newPut.addColumn(Bytes.toBytes(Names.CALLEE.getValue()), Bytes.toBytes("call1"), Bytes.toBytes(call2));
            newPut.addColumn(Bytes.toBytes(Names.CALLEE.getValue()), Bytes.toBytes("calltime"), Bytes.toBytes(calltime));
            newPut.addColumn(Bytes.toBytes(Names.CALLEE.getValue()), Bytes.toBytes("call2"), Bytes.toBytes(call1));
            newPut.addColumn(Bytes.toBytes(Names.CALLEE.getValue()), Bytes.toBytes("duration"), Bytes.toBytes(duration));

            table.put(put);
            table.close();
        }

    }*/

    private class MyDao extends BaseDao{

    }

}
